package com.bizmda.bizsip.sink.connector;

import cn.hutool.core.util.IdUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.bizmda.bizsip.common.BizException;
import com.bizmda.bizsip.common.BizResultEnum;
import com.bizmda.bizsip.config.AbstractSinkConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * @author 史正烨
 */
@Slf4j
public class RabbitmqSinkConnector extends AbstractSinkConnector {
    private RabbitTemplate rabbitTemplate = null;
    private String routingKey;
    private String exchange;

    @Override
    public void init(AbstractSinkConfig sinkConfig) throws BizException {
        super.init(sinkConfig);
        this.exchange = (String) sinkConfig.getConnectorMap().get("exchange");
        this.routingKey = (String) sinkConfig.getConnectorMap().get("routing-key");
        log.info("初始化RabbitmqSinkConnector:exchange[{}],routing-key[{}]",this.exchange,this.routingKey);
        if (rabbitTemplate == null) {
            rabbitTemplate = SpringUtil.getBean("rabbitTemplate");
        }
    }

    @Override
    public Object process(Object inMessage) throws BizException {
        String simpleUuid = IdUtil.simpleUUID();
        CorrelationData correlationData = new CorrelationData(simpleUuid);
        log.debug("调用RabbitmqSinkConnector的process(),simpleUUID:{},routeKey:{}",simpleUuid,this.routingKey);
//        log.trace("调用参数:\n{}", BizUtils.buildHexLog(inMessage));
        Object outMessage = rabbitTemplate.convertSendAndReceive(this.exchange, this.routingKey, inMessage, correlationData);
        if (outMessage == null) {
            throw new BizException(BizResultEnum.SINK_MQ_TIMEOUT);
        }
//        log.trace("返回结果:\n{}",BizUtils.buildHexLog((byte[])outMessage));
        return (byte[])outMessage;
    }
}
